-
Notifications
You must be signed in to change notification settings - Fork 1.8k
filter_lookup: added filter for key value lookup #10620
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Test configuration Fluent Bit YAML Configuration
To test new filter we will load a range of log values including, strings (different cases), integer, boolean, embedded quotes and other value types. devices.log {"hostname": "server-prod-001"}
{"hostname": "Server-Prod-001"}
{"hostname": "db-test-abc"}
{"hostname": 123}
{"hostname": true}
{"hostname": " host with space "}
{"hostname": "quoted \"host\""}
{"hostname": "unknown-host"}
{}
{"hostname": [1,2,3]}
{"hostname": {"sub": "val"}}
{"hostname": " "} CSV configuration will aim to test key overwrites, different types of strings, use and escaping of quotes. device-bu.csv
When executed with verbose flag the following out is produced. Test output
Output shows correct matching and handling of different value types and correct output when no match is detected. Valgrind summary (after run with multiple types of lookups):
|
Documentation for this filter has been submitted as part of #fluent/fluent-bit-docs/pull/1953. |
Added unit tests for lookup filter. All tests pass:
Valgrind results are showing appropriate memory management.
|
Added fix for failing checks on Cent OS 7 and Windows. Please rerun. |
Last check is failing due to Cent OS 7 incompatibility in unit test file - fix in last commit. Please rerun. |
Could this please get one more run at the checks? I didn't realise we need this to compile on Cent OS 7 - should be good now with last commit. |
Can you rebase and push so it reruns tests? |
WalkthroughAdds a new CSV-backed "lookup" filter plugin with build option and CMake wiring, implements plugin source and header (CSV parsing, hash table lookup, record accessor, metrics), and introduces comprehensive runtime tests gated on record accessor support. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant In as Input
participant FL as filter_lookup
participant CSV as CSV Loader
participant HT as Hash Table
participant Out as Output
Note over FL: Initialization (cb_lookup_init)
FL->>CSV: open CSV file & parse entries
CSV->>HT: insert normalized key→value
CSV-->>FL: return count
loop per record/batch (cb_lookup_filter)
In->>FL: encoded record(s)
FL->>FL: extract lookup_key (record accessor)
alt key present & HT hit
FL->>HT: lookup(key)
HT-->>FL: value
FL->>FL: construct modified record (add result_key)
FL-->>Out: emit modified record
else miss or error
FL-->>Out: emit original record
end
end
Note over FL: Exit (cb_lookup_exit) — free RA, HT, stored values, metrics
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Possibly related issues
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests
📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: CHILL Plan: Pro 📒 Files selected for processing (7)
🚧 Files skipped from review as they are similar to previous changes (6)
🧰 Additional context used🧬 Code graph analysis (1)plugins/filter_lookup/lookup.c (10)
🔇 Additional comments (21)
Tip 👮 Agentic pre-merge checks are now available in preview!Pro plan users can now enable pre-merge checks in their settings to enforce checklists before merging PRs.
Please see the documentation for more information. Example: reviews:
pre_merge_checks:
custom_checks:
- name: "Undocumented Breaking Changes"
mode: "warning"
instructions: |
Pass/fail criteria: All breaking changes to public APIs, CLI flags, environment variables, configuration keys, database schemas, or HTTP/GraphQL endpoints must be documented in the "Breaking Change" section of the PR description and in CHANGELOG.md. Exclude purely internal or private changes (e.g., code not exported from package entry points or explicitly marked as internal). Please share your feedback with us on this Discord post. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 7
🧹 Nitpick comments (10)
plugins/filter_lookup/lookup.h (1)
28-31
: Use an enum for metric IDs and confirm legacy metrics exposureMinor: prefer an enum to group these IDs and avoid macro leakage. Also, since the runtime tests fetch metrics via flb_metrics_get_id(), please confirm the plugin populates the legacy metrics list (f_ins->metrics) in addition to cmetrics; otherwise tests on builds without FLB_HAVE_METRICS will fail.
Apply:
-#define FLB_LOOKUP_METRIC_PROCESSED 200 -#define FLB_LOOKUP_METRIC_MATCHED 201 -#define FLB_LOOKUP_METRIC_SKIPPED 202 +enum { + FLB_LOOKUP_METRIC_PROCESSED = 200, + FLB_LOOKUP_METRIC_MATCHED = 201, + FLB_LOOKUP_METRIC_SKIPPED = 202 +};tests/runtime/filter_lookup.c (2)
125-145
: Don’t assume null-terminated output; respect sizeThe lib output callback receives a buffer and length. Using strstr() on an assumed NUL-terminated buffer is brittle. Copy to a NUL-terminated scratch buffer first.
static int cb_check_result_json(void *record, size_t size, void *data) { - char *p; - char *expected; - char *result; + char *p; + char *expected; + char *result; expected = (char *) data; - result = (char *) record; + result = flb_malloc(size + 1); + if (!result) { + flb_free(record); + return -1; + } + memcpy(result, record, size); + result[size] = '\0'; p = strstr(result, expected); TEST_CHECK(p != NULL); if (p == NULL) { flb_error("Expected to find: '%s' in result '%s'", expected, result); } - flb_free(record); + flb_free(result); + flb_free(record); return 0; }
449-497
: Strengthen “no match” test: assert result_key is absentCurrently this test only checks that another field exists. Add a negative assertion to ensure the filter didn’t inject result_key on misses.
+/* Callback to assert a substring is NOT present */ +static int cb_assert_not_contains(void *record, size_t size, void *data) +{ + char *needle = (char *) data; + char *buf = flb_malloc(size + 1); + if (!buf) { flb_free(record); return -1; } + memcpy(buf, record, size); + buf[size] = '\0'; + TEST_CHECK(strstr(buf, needle) == NULL); + if (strstr(buf, needle) != NULL) { + flb_error("Unexpected substring '%s' found in: '%s'", needle, buf); + } + flb_free(buf); + flb_free(record); + return 0; +} @@ - /* Should NOT contain the result_key since no match was found */ - cb_data.cb = cb_check_result_json; - cb_data.data = "\"other_field\":\"test\""; + /* Should NOT contain result_key since no match was found */ + cb_data.cb = cb_assert_not_contains; + cb_data.data = "\"user_name\":";plugins/filter_lookup/lookup.c (7)
766-768
: Float formatting may not match CSV keys; use compact representation.
%f
prints 6 decimals (e.g.,1.0
→1.000000
). Prefer%.15g
to preserve intent and improve match rates.- case FLB_RA_FLOAT: - required_size = snprintf(NULL, 0, "%f", rval->o.via.f64); + case FLB_RA_FLOAT: + required_size = snprintf(NULL, 0, "%.15g", rval->o.via.f64); break; ... - case FLB_RA_FLOAT: - printed = snprintf(dynamic_val_buf, required_size + 1, "%f", rval->o.via.f64); + case FLB_RA_FLOAT: + printed = snprintf(dynamic_val_buf, required_size + 1, "%.15g", rval->o.via.f64); break;Also applies to: 815-817
46-74
: Metrics macros: avoid static mutable storage and guard counter pointers.
static char* labels_array[1];
is shared across threads. Also,cmt_counter_create()
can fail; guardctx->cmt_*
in the macros.-#define INCREMENT_SKIPPED_METRIC(ctx, ins) do { \ - uint64_t ts = cfl_time_now(); \ - static char* labels_array[1]; \ - labels_array[0] = (char*)flb_filter_name(ins); \ - cmt_counter_add(ctx->cmt_skipped, ts, 1, 1, labels_array); \ - flb_metrics_sum(FLB_LOOKUP_METRIC_SKIPPED, 1, ins->metrics); \ -} while(0) +#define INCREMENT_SKIPPED_METRIC(ctx, ins) do { \ + uint64_t ts = cfl_time_now(); \ + char* labels_array[1]; \ + labels_array[0] = (char *) flb_filter_name(ins); \ + if ((ctx)->cmt_skipped) { cmt_counter_add((ctx)->cmt_skipped, ts, 1, 1, labels_array); } \ + if ((ins)->metrics) { flb_metrics_sum(FLB_LOOKUP_METRIC_SKIPPED, 1, (ins)->metrics); } \ +} while (0)Apply the same pattern to
INCREMENT_MATCHED_METRIC
andINCREMENT_PROCESSED_METRIC
.
529-547
: Defensive checks on metric creation.If any
cmt_counter_create()
returns NULL, later increments will segfault without guards. Consider logging and continuing without those counters.- ctx->cmt_processed = cmt_counter_create(ins->cmt, + ctx->cmt_processed = cmt_counter_create(ins->cmt, "fluentbit", "filter", "lookup_processed_records_total", "Total number of processed records", 1, labels_name); + if (!ctx->cmt_processed) { flb_plg_warn(ins, "failed to create processed counter"); } ... - ctx->cmt_matched = cmt_counter_create(ins->cmt, + ctx->cmt_matched = cmt_counter_create(ins->cmt, "fluentbit", "filter", "lookup_matched_records_total", "Total number of matched records", 1, labels_name); + if (!ctx->cmt_matched) { flb_plg_warn(ins, "failed to create matched counter"); } ... - ctx->cmt_skipped = cmt_counter_create(ins->cmt, + ctx->cmt_skipped = cmt_counter_create(ins->cmt, "fluentbit", "filter", "lookup_skipped_records_total", "Total number of skipped records due to errors", 1, labels_name); + if (!ctx->cmt_skipped) { flb_plg_warn(ins, "failed to create skipped counter"); }
261-269
: Always skipping the first CSV line assumes a header. Make this configurable.Not all CSVs include a header; the first record would be dropped. Add a
skip_header
(bool) option or auto-detect, and document the behavior.I can draft the config plumb-through and tests if you want it in this PR.
606-609
: Avoid relying on internalht->total_count
.Accessing struct internals risks future ABI churn. Track a local “loaded entries” count in
load_csv()
and log that instead.
919-938
: Precomputeresult_key
length and usememcmp
.Minor perf/readability tweak in the hot path.
- if (log_event.body && log_event.body->type == MSGPACK_OBJECT_MAP) { + if (log_event.body && log_event.body->type == MSGPACK_OBJECT_MAP) { int i; + size_t rkey_len = strlen(ctx->result_key); for (i = 0; i < log_event.body->via.map.size; i++) { msgpack_object_kv *kv = &log_event.body->via.map.ptr[i]; if (kv->key.type == MSGPACK_OBJECT_STR && - kv->key.via.str.size == strlen(ctx->result_key) && - strncmp(kv->key.via.str.ptr, ctx->result_key, kv->key.via.str.size) == 0) { + kv->key.via.str.size == rkey_len && + memcmp(kv->key.via.str.ptr, ctx->result_key, rkey_len) == 0) { continue; }
183-244
: CSV support limitations (multiline fields).Reader splits on newline before CSV parsing; embedded newlines within quoted fields aren’t supported. If out-of-scope, document explicitly.
Also applies to: 291-417
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
cmake/plugins_options.cmake
(1 hunks)plugins/CMakeLists.txt
(1 hunks)plugins/filter_lookup/CMakeLists.txt
(1 hunks)plugins/filter_lookup/lookup.c
(1 hunks)plugins/filter_lookup/lookup.h
(1 hunks)tests/runtime/CMakeLists.txt
(1 hunks)tests/runtime/filter_lookup.c
(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
plugins/filter_lookup/lookup.c (7)
include/fluent-bit/flb_mem.h (2)
flb_free
(126-128)flb_calloc
(84-96)src/flb_hash_table.c (4)
flb_hash_table_add
(401-494)flb_hash_table_create
(99-137)flb_hash_table_destroy
(197-215)flb_hash_table_get
(496-522)include/fluent-bit/flb_filter.h (1)
flb_filter_config_map_set
(125-129)src/flb_record_accessor.c (3)
flb_ra_create
(271-358)flb_ra_destroy
(232-248)flb_ra_get_value_object
(803-814)src/flb_log_event_encoder.c (7)
flb_log_event_encoder_begin_record
(246-254)flb_log_event_encoder_set_timestamp
(276-287)flb_log_event_encoder_commit_record
(256-274)flb_log_event_encoder_rollback_record
(241-244)flb_log_event_encoder_init
(42-74)flb_log_event_encoder_claim_internal_buffer_ownership
(118-124)flb_log_event_encoder_destroy
(99-116)src/flb_log_event_decoder.c (3)
flb_log_event_decoder_init
(99-116)flb_log_event_decoder_destroy
(147-179)flb_log_event_decoder_next
(310-406)src/flb_ra_key.c (1)
flb_ra_key_value_destroy
(842-851)
tests/runtime/filter_lookup.c (4)
src/flb_lib.c (11)
flb_create
(138-220)flb_input
(261-271)flb_input_set
(300-330)flb_filter
(287-297)flb_output
(274-284)flb_output_set
(515-546)flb_stop
(942-985)flb_destroy
(223-258)flb_filter_set
(613-644)flb_start
(914-925)flb_lib_push
(774-801)include/fluent-bit/flb_mem.h (1)
flb_free
(126-128)plugins/filter_lookup/lookup.c (3)
dynbuf_init
(141-151)dynbuf_append_char
(154-169)dynbuf_destroy
(172-180)src/flb_metrics.c (1)
flb_metrics_get_id
(62-75)
🔇 Additional comments (3)
cmake/plugins_options.cmake (1)
89-89
: New lookup filter option — LGTMOption is correctly added and follows the FLB_MINIMAL override behavior.
plugins/filter_lookup/lookup.c (2)
1-1028
: Overall: solid, careful implementation.CSV parsing with proper escaping, case-insensitive matching via normalized keys, RA integration, encoder/decoder usage, and exhaustive cleanup paths are well-structured. Nice work.
990-1007
: Ownership verified — hashtable copies/frees its own buffer when val_size > 0 (no double-free).entry_set_value() (flb_hash_table.c) allocates and copies the supplied buffer when val_size > 0 and flb_hash_table_entry_free()/flb_hash_table_destroy() free that internal copy; the plugin allocates val_heap, stores it in ctx->val_list and frees it in cb_lookup_exit — these are distinct allocations, so the current cleanup is correct.
New filter aims to address use case of simple data enrichment using static key value lookup. The filter loads first two columns of CSV file into memory as a hash table. When a specified record value matches the key in the hash table the value will be appended to the record (based on key name defined in the filter inputs).) Tested with valgrind. Signed-off-by: Oleg Mukhin <[email protected]>
- Removed unecessary FLB_FILTER_LOOKUP build flag now LookUp is enabled by default like other filters (without flag). - Fixed critical use-after-free bug in numeric value lookups. - Added processed_records_total, matched_records_total and skipped_records_total metrics to enable operational visibility - Added unit tests to cover handling of different data types, CSV loading/handling and metrics tests. Tested with valgrind - no memory leaks. All unit tests pass. Signed-off-by: Oleg Mukhin <[email protected]>
- fix variable declarations and remove C99 features - Conditional compilation for Windows vs Unix headers/functions - Replace bool with int, fix format specifiers, update comments All 15 unit tests for filter passed. Signed-off-by: Oleg Mukhin <[email protected]>
- fix variable declarations and remove C99 features for unit tests - Conditional compilation for Windows for unit test features All 15 unit tests for filter passed. Signed-off-by: Oleg Mukhin <[email protected]>
Addressed following issues: Fix potential memory leak when val_node allocation fails Wrap test metrics code with FLB_HAVE_METRICS guards Replace metric macros with enum to prevent namespace pollution Gate plugin registration on FLB_RECORD_ACCESSOR option Add unmatched quote detection after key parsing in CSV loader Replace magic numbers with semantic msgpack type checking Fix thread safety in lookup filter metrics macros Eliminated potential segfaults from null pointer dereferences Added defensive checks to the metric creation code Optimise hot path by eliminating repeated strlen calls Signed-off-by: Oleg Mukhin <[email protected]>
@patrick-stephens rebaselined to latest commit as advised, built/deployed without issues (related to the plug-in), and also addressed all the new review comments from the AI reviewer in the last commit. Let me know if there are any further changes required - thanks. I see some potential improvements (such as multiline CSV data support, JSON file support, support for occasionally checking for new data in lookup files, etc.), but I think these should be separate PRs to keep things clean. |
Added a new LookUp filter to address use case when enrichment of record is required based on simple static key value lookup.
The filter loads a CSV file into a hash table for performance. It consider first column of the CSV to be the key and the second column to be the value. All other columns are ignored.
Where a record value (identified by
lookup_key
input) matches the key from the CSV, the value from the CSV row is added under a new key (defined byresult_key
input) to the record.Enter
[N/A]
in the box, if an item is not applicable to your change.Testing
Before we can approve your change; please submit the following in a comment:
If this is a change to packaging of containers or native binaries then please confirm it works for all targets.
ok-package-test
label to test for all targets (requires maintainer to do).Documentation
Backporting
Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.
Summary by CodeRabbit
New Features
Tests
Chores